S3 Event Trigerから共通State Machine(Step Function)を呼び出す方法を検証する
データアナリティクス事業本部のnkhrです。今回のブログでは、ETLの前処理を想定し、IFごとのS3ファイルに対して、共通の処理を実行するための構成(前処理自体は行っていません)を検証します。
具体的には、S3 Event Triger → Step Function(特定ファイル時にGlue呼び出し)→Glue Jobでどのような情報を取得して、前処理に渡せるかを検証します。今回はCloud Trailのイベント(S3:PutObject)をEventBridgeで検知し、EventBridgeからStep Functionを起動します。
EventBridgeやStep FunctionなどAWSサービスの仕様について記載している箇所は、2022/01時点の仕様に基づいて記載しています。
S3 Bucketとサンプルデータ作成
S3バケットは、以下の構成とします。upload_completedファイルはデータのアップロード完了後、最後にアップロードされる完了マークファイルとします。この完了マークファイルのアップロードを検知して、Glue Jobを実行します。
- if_001/
- yyyy=2022/mm=01/dd=03/
- accesslog1.jsonl
- accesslog2.jsonl
- upload_completed ※ファイルアップロード完了マークファイル
- yyyy=2022/mm=01/dd=04/
- accesslog1.jsonl
- upload_completed
- yyyy=2022/mm=01/dd=03/
accesslogのファイル内容は以下のようなファイルとします。(1ファイルの行数は6行)
EventBridgeへのイベント通知
CloudTrailイベントの通知設定
S3 EventをEvent Bridgeで取得するために、CloudTrailでS3 Eventを記録します。CloudTrailの画面から新しいCloudTrailを作成します。2021/12以降は、S3 Event(Object Level)を直接Event Bridgeで取得できるため、証跡の取得以外の目的でCloudTrailを利用する必要はありません。
CloudTrailでS3PutObjectを取得するため、Data Eventの記録を設定します。Data EventはCloud TrailのEvent Hisotryからは確認できないため、事前確認したい場合は、S3バケットのファイルを直接確認するか、Cloud Watch Logsに出力して確認します。
S3から直接イベント通知する設定(追記)
S3バケットのPropertiesから、Event Bridgeのイベント通知をONにすることで、CloudTrailなしにEventを送信できます。CloudTailでの証跡記録が必要ない場合は、こちらの方法が利用できます。ONに変更後にEventBridge側でイベントが取得できるようになるまでに、5分程度はかかるようです。(公式サイト:Using the S3 console参照)
Step Functionのサンプル作成
Step Functionのサンプルを作成します。後程Glue Jobの作成と起動を設定しますが、このタイミングでは何も行わないState Machineを作成します。
State Machine名を「s3event-common」とします。Roleは「Create new role」から自動で作成します。自動作成を選択すると「StepFunctions-s3event-common-role-[uuid]」という名称で作成されます。このRoleには、デフォルトでXRAYに関する以下の4つのPermission(2022/01時点)が設定されています。
- xray:PutTraceSegments
- xray:PutTelemetryRecords
- xray:GetSamplingRules
- xray:GetSamplingTargets
EventBridgeでRule作成
Amazon EventBridgeの[Create Rule]からS3 PutObjectイベント用のRuleを作成し、イベント発生時にStateMachineを起動します。
Event Pattern
特定S3バケットのPutObjectイベントを検知するEvent Patternを設定します。EventPatternはワイルドカードやSuffix一致が使えないため(2022/01時点)、すべてのPutObjectイベントを検知します。後続のStateMachineで、完了マークファイル「upload_completed」を判別します。
- CloudTrailからのEventを取得する場合
- S3から直接EventBridgeにイベントを取得する場合(追記)
- detail_type: ["Object Created"]には、PutObject以外にも以下のActionイベントが含まれます。
- PutObject
- POST Object
- CopyObject
- CompleteMultipartUpload
Event Target
イベントターゲットには、作成したState Machineを指定します。Event Target用のRoleを自動作成とした場合、対象のターゲット(今回はState Machine)を起動するためのRoleが自動で作成されます。
その他の設定
その他として、以下の設定が可能です。今回は全てデフォルトにしました。
- EventBus選択
- Eventを受け取るバスをカスタム作成した場合は、選択可能。EventBusごとにResource Policyを指定できます。デフォルトでは「default」EventBus(Policy設定なし)があります。
- 選択したEventBusのRuleを適用するかのEnable/Disableも設定可能です。
- Retry Policy設定
- 最大24HまでRetryが可能です。Retryの設定は以下の2つを定義できます。
- Maximum age of event (HH:mm)
- 処理できなかったイベントを保存する期間を指定。デフォルトは24H。
- Retry attempts
- エラー発生時にリトライする最大回数を指定。デフォルトは185回。
- Dead-letter queue設定
- 処理できなかったイベントをSQSに送信するかの設定。下記の3パターンを指定可能。
- None(デフォルト)
- 同じアカウントの既存のSQSに送信(ドロップダウンリストからSQSを指定)
- 他のアカウントのSQSに送信(SQSのARNを指定、Resource base policyが必要)
- 処理できなかったイベントをSQSに送信するかの設定。下記の3パターンを指定可能。
Step FunctionからGlueジョブ呼び出し設定
ここまでの設定で、対象バケットにファイルをアップロードすると、StateMachineが起動することを確認できます。
次に、StateMachineの設定を変更し、Glueジョブを呼び出します。StateMachineへのInputとして、CloudTrailで取得されたEventのJSONが渡されるため、その一部(下記)をGlue Jobに渡します。
CloudTrailからのEvent JSONと、S3から直接Event Bridgeに送られるJSONファイルは、内容が異なります。
初回の投稿ブログでは、CloudTrailのEvent JSONを利用してStateMachine、Glue Jobを実行していました。現バージョンのブログでは、S3から直接Event Bridgeに送られるJSONファイルを使って、StateMachineとGlue Jobを実行するようにコードを修正しています。
- S3 to EventBridge Jsonファイルの場合(追加)
- $.account:AWSアカウント番号
- $.region:Region
- $.time : 実行時刻
- $.detail.bucket.name: BucketName
- $.detail.object.key: オブジェクトKey名
- CloudTrail Event Jsonファイルの場合
- $.account:AWSアカウント番号
- $.region:Region
- $.time : 実行時刻
- $.detail.userIdentity.arn :実行User/RoleのARN(S3からの直接Event送信ではUser/Roleは取得不可)
- $.detail.resources[0].type:Type
- $.detail.resources[0].ARN:アップデートKeyのARN(このARNにBucket名とオブジェクトKey名が含まれる)
Glue Jobの作成
Glue Jobでは以下を作成します。
- [事前準備] Glue JobからS3やCloudWatch LogsにアクセスできるRoleを作成(※通常は以下のPolicyの中で必要なリソースとActionに絞る)
- AmazonS3FullAccess
- AWSGlueServiceRole
- Glue Jobを作成
- Job Name: test_job (Step Functionから起動するために利用)
- Max Concurency:3
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME", "accountid", "region", "bucketname", "object_key"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # print arguments print(f"accountid: {args['accountid']}") print(f"region: {args['region']}") print(f"bucketname: {args['bucketname']}") print(f"object_key: {args['object_key']}") bucket = args['bucketname'] target_dir = args['object_key'].replace("upload_completed","") # upload_completedのファイルのパスにあるすべてのファイルを取得 # upload_completedのみ除く df = glueContext.create_dynamic_frame.from_options( format_options={"multiline": False}, connection_type="s3", format="json", connection_options={ "paths": [f"s3://{bucket}/{target_dir}"], 'exclusions': f"[\"s3://{bucket}/{target_dir}/upload_completed\"]", "recurse": True } ) # ファイル行数を表示 print('Count: {0}'.format(df.count())) # ファイル内のスキーマを表示 df.printSchema() job.commit()
Step FunctionからGlue Jobの起動設定
- Glue Job実行Policyの追加
- 最初に作成したStep FunctionのRole「StepFunctions-s3event-common-role-[uuid]」に以下のPolicyを追加
- Passにしていた部分をGlue Jobの起動に置き換え
{ "Comment": "A description of my state machine", "StartAt": "Choice", "States": { "Choice": { "Type": "Choice", "Choices": [ { "Variable": "$.detail.object.key", "StringMatches": "*/upload_completed", "Next": "Glue StartJobRun" } ], "Default": "Pass" }, "Pass": { "Type": "Pass", "End": true }, "Glue StartJobRun": { "Type": "Task", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName": "test_job", "Arguments": { "--accountid.$": "$.account", "--region.$": "$.region", "--time.$": "$.time", "--bucketname.$": "$.detail.bucket.name", "--object_key.$": "$.detail.object.key" } }, "End": true, "ResultPath": "$.result" } } }
今回の実行では、Standardワークフロー(状態遷移数で課金)で実行しましたが、この構成の場合はファイル数分の遷移が発生するため、Expressワークフロー(リクエスト数とメモリ利用時間)の方が適しているかもしれません。
動作確認
実行手順
以下の手順で実行し、GlueからCloudWatch Logsに出力されたログを確認する。
- 以下の2つのフォルダをバケット配下に作成
- if_001/yyyy=2022/mm=01/dd=03/
- if_001/yyyy=2022/mm=01/dd=04/
-
以下の2ファイルをアップロード
- if_001/yyyy=2022/mm=01/dd=03/accesslog1.jsonl
- if_001/yyyy=2022/mm=01/dd=03/accesslog2.jsonl
-
完了マークファイルをアップロード
- if_001/yyyy=2022/mm=01/dd=03/upload_completed
-
以下の1ファイルをアップロード
- if_001/yyyy=2022/mm=01/dd=04/accesslog1.jsonl
- 完了マークファイルをアップロード
- if_001/yyyy=2022/mm=01/dd=04/upload_completed
完了確認
Glue jobが2回実行され、それぞれの実行ログの結果として取得した行数が1回目のジョブは12行、2回目のジョブは6行となっていることを確認する。
<Glue Job1結果>
<Glue Job2結果>
まとめ
今回は、S3の完了マークファイルのアップロードを検知し、State Machine⇒Glueジョブ呼び出す仕組みを検証しました。
イベント駆動の仕組みは、一度作成すれば、同じような処理が発生した場合に自動処理できるため、上手く仕組み化できれば有用だと思います。ただし、実際にイベント駆動でデータ加工を行う場合は、イベントのロストや再送などに対して、ジョブ実行の冪等性担保や、イベントが実行されていない場合の検知の仕組み(どこでチェックするかはケースバイケース)が必要になります。
以上、nkhrでした。